{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Group Chat with Retrieval Augmented Generation\n",
    "\n",
    "AutoGen supports conversable agents powered by LLMs, tools, or humans, performing tasks collectively via automated chat. This framework allows tool use and human participation through multi-agent conversation.\n",
    "Please find documentation about this feature [here](https://microsoft.github.io/autogen/docs/Use-Cases/agent_chat).\n",
    "\n",
    "````{=mdx}\n",
    ":::info Requirements\n",
    "Some extra dependencies are needed for this notebook, which can be installed via pip:\n",
    "\n",
    "```bash\n",
    "pip install pyautogen[retrievechat]\n",
    "```\n",
    "\n",
    "For more information, please refer to the [installation guide](/docs/installation/).\n",
    ":::\n",
    "````"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Set your API Endpoint\n",
    "\n",
    "The [`config_list_from_json`](https://microsoft.github.io/autogen/docs/reference/oai/openai_utils#config_list_from_json) function loads a list of configurations from an environment variable or a json file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "LLM models:  ['gpt4-1106-preview', 'gpt-35-turbo', 'gpt-35-turbo-0613']\n"
     ]
    }
   ],
   "source": [
    "import chromadb\n",
    "from typing_extensions import Annotated\n",
    "\n",
    "import autogen\n",
    "from autogen import AssistantAgent\n",
    "from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent\n",
    "\n",
    "config_list = autogen.config_list_from_json(\"OAI_CONFIG_LIST\")\n",
    "\n",
    "print(\"LLM models: \", [config_list[i][\"model\"] for i in range(len(config_list))])"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "````{=mdx}\n",
    ":::tip\n",
    "Learn more about configuring LLMs for agents [here](/docs/topics/llm_configuration).\n",
    ":::\n",
    "````\n",
    "\n",
    "## Construct Agents"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/lijiang1/anaconda3/envs/autogen/lib/python3.10/site-packages/transformers/utils/generic.py:311: UserWarning: torch.utils._pytree._register_pytree_node is deprecated. Please use torch.utils._pytree.register_pytree_node instead.\n",
      "  torch.utils._pytree._register_pytree_node(\n"
     ]
    }
   ],
   "source": [
    "def termination_msg(x):\n",
    "    return isinstance(x, dict) and \"TERMINATE\" == str(x.get(\"content\", \"\"))[-9:].upper()\n",
    "\n",
    "\n",
    "llm_config = {\"config_list\": config_list, \"timeout\": 60, \"temperature\": 0.8, \"seed\": 1234}\n",
    "\n",
    "boss = autogen.UserProxyAgent(\n",
    "    name=\"Boss\",\n",
    "    is_termination_msg=termination_msg,\n",
    "    human_input_mode=\"NEVER\",\n",
    "    code_execution_config=False,  # we don't want to execute code in this case.\n",
    "    default_auto_reply=\"Reply `TERMINATE` if the task is done.\",\n",
    "    description=\"The boss who ask questions and give tasks.\",\n",
    ")\n",
    "\n",
    "boss_aid = RetrieveUserProxyAgent(\n",
    "    name=\"Boss_Assistant\",\n",
    "    is_termination_msg=termination_msg,\n",
    "    human_input_mode=\"NEVER\",\n",
    "    default_auto_reply=\"Reply `TERMINATE` if the task is done.\",\n",
    "    max_consecutive_auto_reply=3,\n",
    "    retrieve_config={\n",
    "        \"task\": \"code\",\n",
    "        \"docs_path\": \"https://raw.githubusercontent.com/microsoft/FLAML/main/website/docs/Examples/Integrate%20-%20Spark.md\",\n",
    "        \"chunk_token_size\": 1000,\n",
    "        \"model\": config_list[0][\"model\"],\n",
    "        \"collection_name\": \"groupchat\",\n",
    "        \"get_or_create\": True,\n",
    "    },\n",
    "    code_execution_config=False,  # we don't want to execute code in this case.\n",
    "    description=\"Assistant who has extra content retrieval power for solving difficult problems.\",\n",
    ")\n",
    "\n",
    "coder = AssistantAgent(\n",
    "    name=\"Senior_Python_Engineer\",\n",
    "    is_termination_msg=termination_msg,\n",
    "    system_message=\"You are a senior python engineer, you provide python code to answer questions. Reply `TERMINATE` in the end when everything is done.\",\n",
    "    llm_config=llm_config,\n",
    "    description=\"Senior Python Engineer who can write code to solve problems and answer questions.\",\n",
    ")\n",
    "\n",
    "pm = autogen.AssistantAgent(\n",
    "    name=\"Product_Manager\",\n",
    "    is_termination_msg=termination_msg,\n",
    "    system_message=\"You are a product manager. Reply `TERMINATE` in the end when everything is done.\",\n",
    "    llm_config=llm_config,\n",
    "    description=\"Product Manager who can design and plan the project.\",\n",
    ")\n",
    "\n",
    "reviewer = autogen.AssistantAgent(\n",
    "    name=\"Code_Reviewer\",\n",
    "    is_termination_msg=termination_msg,\n",
    "    system_message=\"You are a code reviewer. Reply `TERMINATE` in the end when everything is done.\",\n",
    "    llm_config=llm_config,\n",
    "    description=\"Code Reviewer who can review the code.\",\n",
    ")\n",
    "\n",
    "PROBLEM = \"How to use spark for parallel training in FLAML? Give me sample code.\"\n",
    "\n",
    "\n",
    "def _reset_agents():\n",
    "    boss.reset()\n",
    "    boss_aid.reset()\n",
    "    coder.reset()\n",
    "    pm.reset()\n",
    "    reviewer.reset()\n",
    "\n",
    "\n",
    "def rag_chat():\n",
    "    _reset_agents()\n",
    "    groupchat = autogen.GroupChat(\n",
    "        agents=[boss_aid, pm, coder, reviewer], messages=[], max_round=12, speaker_selection_method=\"round_robin\"\n",
    "    )\n",
    "    manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)\n",
    "\n",
    "    # Start chatting with boss_aid as this is the user proxy agent.\n",
    "    boss_aid.initiate_chat(\n",
    "        manager,\n",
    "        message=boss_aid.message_generator,\n",
    "        problem=PROBLEM,\n",
    "        n_results=3,\n",
    "    )\n",
    "\n",
    "\n",
    "def norag_chat():\n",
    "    _reset_agents()\n",
    "    groupchat = autogen.GroupChat(\n",
    "        agents=[boss, pm, coder, reviewer],\n",
    "        messages=[],\n",
    "        max_round=12,\n",
    "        speaker_selection_method=\"auto\",\n",
    "        allow_repeat_speaker=False,\n",
    "    )\n",
    "    manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)\n",
    "\n",
    "    # Start chatting with the boss as this is the user proxy agent.\n",
    "    boss.initiate_chat(\n",
    "        manager,\n",
    "        message=PROBLEM,\n",
    "    )\n",
    "\n",
    "\n",
    "def call_rag_chat():\n",
    "    _reset_agents()\n",
    "\n",
    "    # In this case, we will have multiple user proxy agents and we don't initiate the chat\n",
    "    # with RAG user proxy agent.\n",
    "    # In order to use RAG user proxy agent, we need to wrap RAG agents in a function and call\n",
    "    # it from other agents.\n",
    "    def retrieve_content(\n",
    "        message: Annotated[\n",
    "            str,\n",
    "            \"Refined message which keeps the original meaning and can be used to retrieve content for code generation and question answering.\",\n",
    "        ],\n",
    "        n_results: Annotated[int, \"number of results\"] = 3,\n",
    "    ) -> str:\n",
    "        boss_aid.n_results = n_results  # Set the number of results to be retrieved.\n",
    "        # Check if we need to update the context.\n",
    "        update_context_case1, update_context_case2 = boss_aid._check_update_context(message)\n",
    "        if (update_context_case1 or update_context_case2) and boss_aid.update_context:\n",
    "            boss_aid.problem = message if not hasattr(boss_aid, \"problem\") else boss_aid.problem\n",
    "            _, ret_msg = boss_aid._generate_retrieve_user_reply(message)\n",
    "        else:\n",
    "            _context = {\"problem\": message, \"n_results\": n_results}\n",
    "            ret_msg = boss_aid.message_generator(boss_aid, None, _context)\n",
    "        return ret_msg if ret_msg else message\n",
    "\n",
    "    boss_aid.human_input_mode = \"NEVER\"  # Disable human input for boss_aid since it only retrieves content.\n",
    "\n",
    "    for caller in [pm, coder, reviewer]:\n",
    "        d_retrieve_content = caller.register_for_llm(\n",
    "            description=\"retrieve content for code generation and question answering.\", api_style=\"function\"\n",
    "        )(retrieve_content)\n",
    "\n",
    "    for executor in [boss, pm]:\n",
    "        executor.register_for_execution()(d_retrieve_content)\n",
    "\n",
    "    groupchat = autogen.GroupChat(\n",
    "        agents=[boss, pm, coder, reviewer],\n",
    "        messages=[],\n",
    "        max_round=12,\n",
    "        speaker_selection_method=\"round_robin\",\n",
    "        allow_repeat_speaker=False,\n",
    "    )\n",
    "\n",
    "    manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)\n",
    "\n",
    "    # Start chatting with the boss as this is the user proxy agent.\n",
    "    boss.initiate_chat(\n",
    "        manager,\n",
    "        message=PROBLEM,\n",
    "    )"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start Chat\n",
    "\n",
    "### UserProxyAgent doesn't get the correct code\n",
    "[FLAML](https://github.com/microsoft/FLAML) was open sourced in 2020, so ChatGPT is familiar with it. However, Spark-related APIs were added in 2022, so they were not in ChatGPT's training data. As a result, we end up with invalid code."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[33mBoss\u001b[0m (to chat_manager):\n",
      "\n",
      "How to use spark for parallel training in FLAML? Give me sample code.\n",
      "\n",
      "--------------------------------------------------------------------------------\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[33mSenior_Python_Engineer\u001b[0m (to chat_manager):\n",
      "\n",
      "To use Spark for parallel training in FLAML (Fast and Lightweight AutoML), you would need to set up a Spark cluster and utilize the `spark` backend for joblib, which FLAML uses internally for parallel training. Here’s an example of how you might set up and use Spark with FLAML for AutoML tasks:\n",
      "\n",
      "Firstly, ensure that you have the Spark cluster set up and the `pyspark` and `joblib-spark` packages installed in your environment. You can install the required packages using pip if they are not already installed:\n",
      "\n",
      "```python\n",
      "!pip install flaml pyspark joblib-spark\n",
      "```\n",
      "\n",
      "Here's a sample code snippet that demonstrates how to use FLAML with Spark for parallel training:\n",
      "\n",
      "```python\n",
      "from flaml import AutoML\n",
      "from pyspark.sql import SparkSession\n",
      "from sklearn.datasets import load_digits\n",
      "from joblibspark import register_spark\n",
      "\n",
      "# Initialize a Spark session\n",
      "spark = SparkSession.builder \\\n",
      "    .master(\"local[*]\") \\\n",
      "    .appName(\"FLAML_Spark_Example\") \\\n",
      "    .getOrCreate()\n",
      "\n",
      "# Register the joblib spark backend\n",
      "register_spark()  # This registers the backend for parallel processing\n",
      "\n",
      "# Load sample data\n",
      "X, y = load_digits(return_X_y=True)\n",
      "\n",
      "# Initialize an AutoML instance\n",
      "automl = AutoML()\n",
      "\n",
      "# Define the settings for the AutoML run\n",
      "settings = {\n",
      "    \"time_budget\": 60,  # Total running time in seconds\n",
      "    \"metric\": 'accuracy',  # Primary metric for evaluation\n",
      "    \"task\": 'classification',  # Task type\n",
      "    \"n_jobs\": -1,  # Number of jobs to run in parallel (use -1 for all)\n",
      "    \"estimator_list\": ['lgbm', 'rf', 'xgboost'],  # List of estimators to consider\n",
      "    \"log_file_name\": \"flaml_log.txt\",  # Log file name\n",
      "}\n",
      "\n",
      "# Run the AutoML search with Spark backend\n",
      "automl.fit(X_train=X, y_train=y, **settings)\n",
      "\n",
      "# Output the best model and its performance\n",
      "print(f\"Best ML model: {automl.model}\")\n",
      "print(f\"Best ML model's accuracy: {automl.best_loss}\")\n",
      "\n",
      "# Stop the Spark session\n",
      "spark.stop()\n",
      "```\n",
      "\n",
      "The `register_spark()` function from `joblib-spark` is used to register the Spark backend with joblib, which is utilized for parallel training within FLAML. The `n_jobs=-1` parameter tells FLAML to use all available Spark executors for parallel training.\n",
      "\n",
      "Please note that the actual process of setting up a Spark cluster can be complex and might involve additional steps such as configuring Spark workers, allocating resources, and more, which are beyond the scope of this code snippet.\n",
      "\n",
      "If you encounter any issues or need to adjust configurations for your specific Spark setup, please refer to the Spark and FLAML documentation for more details.\n",
      "\n",
      "When you run the code, ensure that your Spark cluster is properly configured and accessible from your Python environment. Adjust the `.master(\"local[*]\")` to point to your Spark master's URL if you are running a cluster that is not local.\n",
      "\n",
      "--------------------------------------------------------------------------------\n",
      "To use Spark for parallel training in FLAML (Fast and Lightweight AutoML), you would need to set up a Spark cluster and utilize the `spark` backend for joblib, which FLAML uses internally for parallel training. Here’s an example of how you might set up and use Spark with FLAML for AutoML tasks:\n",
      "\n",
      "Firstly, ensure that you have the Spark cluster set up and the `pyspark` and `joblib-spark` packages installed in your environment. You can install the required packages using pip if they are not already installed:\n",
      "\n",
      "```python\n",
      "!pip install flaml pyspark joblib-spark\n",
      "```\n",
      "\n",
      "Here's a sample code snippet that demonstrates how to use FLAML with Spark for parallel training:\n",
      "\n",
      "```python\n",
      "from flaml import AutoML\n",
      "from pyspark.sql import SparkSession\n",
      "from sklearn.datasets import load_digits\n",
      "from joblibspark import register_spark\n",
      "\n",
      "# Initialize a Spark session\n",
      "spark = SparkSession.builder \\\n",
      "    .master(\"local[*]\") \\\n",
      "    .appName(\"FLAML_Spark_Example\") \\\n",
      "    .getOrCreate()\n",
      "\n",
      "# Register the joblib spark backend\n",
      "register_spark()  # This registers the backend for parallel processing\n",
      "\n",
      "# Load sample data\n",
      "X, y = load_digits(return_X_y=True)\n",
      "\n",
      "# Initialize an AutoML instance\n",
      "automl = AutoML()\n",
      "\n",
      "# Define the settings for the AutoML run\n",
      "settings = {\n",
      "    \"time_budget\": 60,  # Total running time in seconds\n",
      "    \"metric\": 'accuracy',  # Primary metric for evaluation\n",
      "    \"task\": 'classification',  # Task type\n",
      "    \"n_jobs\": -1,  # Number of jobs to run in parallel (use -1 for all)\n",
      "    \"estimator_list\": ['lgbm', 'rf', 'xgboost'],  # List of estimators to consider\n",
      "    \"log_file_name\": \"flaml_log.txt\",  # Log file name\n",
      "}\n",
      "\n",
      "# Run the AutoML search with Spark backend\n",
      "automl.fit(X_train=X, y_train=y, **settings)\n",
      "\n",
      "# Output the best model and its performance\n",
      "print(f\"Best ML model: {automl.model}\")\n",
      "print(f\"Best ML model's accuracy: {automl.best_loss}\")\n",
      "\n",
      "# Stop the Spark session\n",
      "spark.stop()\n",
      "```\n",
      "\n",
      "The `register_spark()` function from `joblib-spark` is used to register the Spark backend with joblib, which is utilized for parallel training within FLAML. The `n_jobs=-1` parameter tells FLAML to use all available Spark executors for parallel training.\n",
      "\n",
      "Please note that the actual process of setting up a Spark cluster can be complex and might involve additional steps such as configuring Spark workers, allocating resources, and more, which are beyond the scope of this code snippet.\n",
      "\n",
      "If you encounter any issues or need to adjust configurations for your specific Spark setup, please refer to the Spark and FLAML documentation for more details.\n",
      "\n",
      "When you run the code, ensure that your Spark cluster is properly configured and accessible from your Python environment. Adjust the `.master(\"local[*]\")` to point to your Spark master's URL if you are running a cluster that is not local.\n",
      "\n",
      "--------------------------------------------------------------------------------\n",
      "\u001b[33mCode_Reviewer\u001b[0m (to chat_manager):\n",
      "\n",
      "TERMINATE\n",
      "\n",
      "--------------------------------------------------------------------------------\n"
     ]
    }
   ],
   "source": [
    "norag_chat()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### RetrieveUserProxyAgent get the correct code\n",
    "Since RetrieveUserProxyAgent can perform retrieval-augmented generation based on the given documentation file, ChatGPT can generate the correct code for us!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2024-04-07 18:26:04,562 - autogen.agentchat.contrib.retrieve_user_proxy_agent - INFO - \u001b[32mUse the existing collection `groupchat`.\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Trying to create collection.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2024-04-07 18:26:05,485 - autogen.agentchat.contrib.retrieve_user_proxy_agent - INFO - Found 1 chunks.\u001b[0m\n",
      "Number of requested results 3 is greater than number of elements in index 1, updating n_results = 1\n",
      "Model gpt4-1106-preview not found. Using cl100k_base encoding.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "VectorDB returns doc_ids:  [['bdfbc921']]\n",
      "\u001b[32mAdding content of doc bdfbc921 to context.\u001b[0m\n",
      "\u001b[33mBoss_Assistant\u001b[0m (to chat_manager):\n",
      "\n",
      "You're a retrieve augmented coding assistant. You answer user's questions based on your own knowledge and the\n",
      "context provided by the user.\n",
      "If you can't answer the question with or without the current context, you should reply exactly `UPDATE CONTEXT`.\n",
      "For code generation, you must obey the following rules:\n",
      "Rule 1. You MUST NOT install any packages because all the packages needed are already installed.\n",
      "Rule 2. You must follow the formats below to write your code:\n",
      "```language\n",
      "# your code\n",
      "```\n",
      "\n",
      "User's question is: How to use spark for parallel training in FLAML? Give me sample code.\n",
      "\n",
      "Context is: # Integrate - Spark\n",
      "\n",
      "FLAML has integrated Spark for distributed training. There are two main aspects of integration with Spark:\n",
      "\n",
      "- Use Spark ML estimators for AutoML.\n",
      "- Use Spark to run training in parallel spark jobs.\n",
      "\n",
      "## Spark ML Estimators\n",
      "\n",
      "FLAML integrates estimators based on Spark ML models. These models are trained in parallel using Spark, so we called them Spark estimators. To use these models, you first need to organize your data in the required format.\n",
      "\n",
      "### Data\n",
      "\n",
      "For Spark estimators, AutoML only consumes Spark data. FLAML provides a convenient function `to_pandas_on_spark` in the `flaml.automl.spark.utils` module to convert your data into a pandas-on-spark (`pyspark.pandas`) dataframe/series, which Spark estimators require.\n",
      "\n",
      "This utility function takes data in the form of a `pandas.Dataframe` or `pyspark.sql.Dataframe` and converts it into a pandas-on-spark dataframe. It also takes `pandas.Series` or `pyspark.sql.Dataframe` and converts it into a [pandas-on-spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) series. If you pass in a `pyspark.pandas.Dataframe`, it will not make any changes.\n",
      "\n",
      "This function also accepts optional arguments `index_col` and `default_index_type`.\n",
      "\n",
      "- `index_col` is the column name to use as the index, default is None.\n",
      "- `default_index_type` is the default index type, default is \"distributed-sequence\". More info about default index type could be found on Spark official [documentation](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type)\n",
      "\n",
      "Here is an example code snippet for Spark Data:\n",
      "\n",
      "```python\n",
      "import pandas as pd\n",
      "from flaml.automl.spark.utils import to_pandas_on_spark\n",
      "\n",
      "# Creating a dictionary\n",
      "data = {\n",
      "    \"Square_Feet\": [800, 1200, 1800, 1500, 850],\n",
      "    \"Age_Years\": [20, 15, 10, 7, 25],\n",
      "    \"Price\": [100000, 200000, 300000, 240000, 120000],\n",
      "}\n",
      "\n",
      "# Creating a pandas DataFrame\n",
      "dataframe = pd.DataFrame(data)\n",
      "label = \"Price\"\n",
      "\n",
      "# Convert to pandas-on-spark dataframe\n",
      "psdf = to_pandas_on_spark(dataframe)\n",
      "```\n",
      "\n",
      "To use Spark ML models you need to format your data appropriately. Specifically, use [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) to merge all feature columns into a single vector column.\n",
      "\n",
      "Here is an example of how to use it:\n",
      "\n",
      "```python\n",
      "from pyspark.ml.feature import VectorAssembler\n",
      "\n",
      "columns = psdf.columns\n",
      "feature_cols = [col for col in columns if col != label]\n",
      "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n",
      "psdf = featurizer.transform(psdf.to_spark(index_col=\"index\"))[\"index\", \"features\"]\n",
      "```\n",
      "\n",
      "Later in conducting the experiment, use your pandas-on-spark data like non-spark data and pass them using `X_train, y_train` or `dataframe, label`.\n",
      "\n",
      "### Estimators\n",
      "\n",
      "#### Model List\n",
      "\n",
      "- `lgbm_spark`: The class for fine-tuning Spark version LightGBM models, using [SynapseML](https://microsoft.github.io/SynapseML/docs/features/lightgbm/about/) API.\n",
      "\n",
      "#### Usage\n",
      "\n",
      "First, prepare your data in the required format as described in the previous section.\n",
      "\n",
      "By including the models you intend to try in the `estimators_list` argument to `flaml.automl`, FLAML will start trying configurations for these models. If your input is Spark data, FLAML will also use estimators with the `_spark` postfix by default, even if you haven't specified them.\n",
      "\n",
      "Here is an example code snippet using SparkML models in AutoML:\n",
      "\n",
      "```python\n",
      "import flaml\n",
      "\n",
      "# prepare your data in pandas-on-spark format as we previously mentioned\n",
      "\n",
      "automl = flaml.AutoML()\n",
      "settings = {\n",
      "    \"time_budget\": 30,\n",
      "    \"metric\": \"r2\",\n",
      "    \"estimator_list\": [\"lgbm_spark\"],  # this setting is optional\n",
      "    \"task\": \"regression\",\n",
      "}\n",
      "\n",
      "automl.fit(\n",
      "    dataframe=psdf,\n",
      "    label=label,\n",
      "    **settings,\n",
      ")\n",
      "```\n",
      "\n",
      "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb)\n",
      "\n",
      "## Parallel Spark Jobs\n",
      "\n",
      "You can activate Spark as the parallel backend during parallel tuning in both [AutoML](/docs/Use-Cases/Task-Oriented-AutoML#parallel-tuning) and [Hyperparameter Tuning](/docs/Use-Cases/Tune-User-Defined-Function#parallel-tuning), by setting the `use_spark` to `true`. FLAML will dispatch your job to the distributed Spark backend using [`joblib-spark`](https://github.com/joblib/joblib-spark).\n",
      "\n",
      "Please note that you should not set `use_spark` to `true` when applying AutoML and Tuning for Spark Data. This is because only SparkML models will be used for Spark Data in AutoML and Tuning. As SparkML models run in parallel, there is no need to distribute them with `use_spark` again.\n",
      "\n",
      "All the Spark-related arguments are stated below. These arguments are available in both Hyperparameter Tuning and AutoML:\n",
      "\n",
      "- `use_spark`: boolean, default=False | Whether to use spark to run the training in parallel spark jobs. This can be used to accelerate training on large models and large datasets, but will incur more overhead in time and thus slow down training in some cases. GPU training is not supported yet when use_spark is True. For Spark clusters, by default, we will launch one trial per executor. However, sometimes we want to launch more trials than the number of executors (e.g., local mode). In this case, we can set the environment variable `FLAML_MAX_CONCURRENT` to override the detected `num_executors`. The final number of concurrent trials will be the minimum of `n_concurrent_trials` and `num_executors`.\n",
      "- `n_concurrent_trials`: int, default=1 | The number of concurrent trials. When n_concurrent_trials > 1, FLAML performes parallel tuning.\n",
      "- `force_cancel`: boolean, default=False | Whether to forcely cancel Spark jobs if the search time exceeded the time budget. Spark jobs include parallel tuning jobs and Spark-based model training jobs.\n",
      "\n",
      "An example code snippet for using parallel Spark jobs:\n",
      "\n",
      "```python\n",
      "import flaml\n",
      "\n",
      "automl_experiment = flaml.AutoML()\n",
      "automl_settings = {\n",
      "    \"time_budget\": 30,\n",
      "    \"metric\": \"r2\",\n",
      "    \"task\": \"regression\",\n",
      "    \"n_concurrent_trials\": 2,\n",
      "    \"use_spark\": True,\n",
      "    \"force_cancel\": True,  # Activating the force_cancel option can immediately halt Spark jobs once they exceed the allocated time_budget.\n",
      "}\n",
      "\n",
      "automl.fit(\n",
      "    dataframe=dataframe,\n",
      "    label=label,\n",
      "    **automl_settings,\n",
      ")\n",
      "```\n",
      "\n",
      "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb)\n",
      "\n",
      "\n",
      "\n",
      "--------------------------------------------------------------------------------\n",
      "\u001b[33mBoss_Assistant\u001b[0m (to chat_manager):\n",
      "\n",
      "You're a retrieve augmented coding assistant. You answer user's questions based on your own knowledge and the\n",
      "context provided by the user.\n",
      "If you can't answer the question with or without the current context, you should reply exactly `UPDATE CONTEXT`.\n",
      "For code generation, you must obey the following rules:\n",
      "Rule 1. You MUST NOT install any packages because all the packages needed are already installed.\n",
      "Rule 2. You must follow the formats below to write your code:\n",
      "```language\n",
      "# your code\n",
      "```\n",
      "\n",
      "User's question is: How to use spark for parallel training in FLAML? Give me sample code.\n",
      "\n",
      "Context is: # Integrate - Spark\n",
      "\n",
      "FLAML has integrated Spark for distributed training. There are two main aspects of integration with Spark:\n",
      "\n",
      "- Use Spark ML estimators for AutoML.\n",
      "- Use Spark to run training in parallel spark jobs.\n",
      "\n",
      "## Spark ML Estimators\n",
      "\n",
      "FLAML integrates estimators based on Spark ML models. These models are trained in parallel using Spark, so we called them Spark estimators. To use these models, you first need to organize your data in the required format.\n",
      "\n",
      "### Data\n",
      "\n",
      "For Spark estimators, AutoML only consumes Spark data. FLAML provides a convenient function `to_pandas_on_spark` in the `flaml.automl.spark.utils` module to convert your data into a pandas-on-spark (`pyspark.pandas`) dataframe/series, which Spark estimators require.\n",
      "\n",
      "This utility function takes data in the form of a `pandas.Dataframe` or `pyspark.sql.Dataframe` and converts it into a pandas-on-spark dataframe. It also takes `pandas.Series` or `pyspark.sql.Dataframe` and converts it into a [pandas-on-spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) series. If you pass in a `pyspark.pandas.Dataframe`, it will not make any changes.\n",
      "\n",
      "This function also accepts optional arguments `index_col` and `default_index_type`.\n",
      "\n",
      "- `index_col` is the column name to use as the index, default is None.\n",
      "- `default_index_type` is the default index type, default is \"distributed-sequence\". More info about default index type could be found on Spark official [documentation](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type)\n",
      "\n",
      "Here is an example code snippet for Spark Data:\n",
      "\n",
      "```python\n",
      "import pandas as pd\n",
      "from flaml.automl.spark.utils import to_pandas_on_spark\n",
      "\n",
      "# Creating a dictionary\n",
      "data = {\n",
      "    \"Square_Feet\": [800, 1200, 1800, 1500, 850],\n",
      "    \"Age_Years\": [20, 15, 10, 7, 25],\n",
      "    \"Price\": [100000, 200000, 300000, 240000, 120000],\n",
      "}\n",
      "\n",
      "# Creating a pandas DataFrame\n",
      "dataframe = pd.DataFrame(data)\n",
      "label = \"Price\"\n",
      "\n",
      "# Convert to pandas-on-spark dataframe\n",
      "psdf = to_pandas_on_spark(dataframe)\n",
      "```\n",
      "\n",
      "To use Spark ML models you need to format your data appropriately. Specifically, use [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) to merge all feature columns into a single vector column.\n",
      "\n",
      "Here is an example of how to use it:\n",
      "\n",
      "```python\n",
      "from pyspark.ml.feature import VectorAssembler\n",
      "\n",
      "columns = psdf.columns\n",
      "feature_cols = [col for col in columns if col != label]\n",
      "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n",
      "psdf = featurizer.transform(psdf.to_spark(index_col=\"index\"))[\"index\", \"features\"]\n",
      "```\n",
      "\n",
      "Later in conducting the experiment, use your pandas-on-spark data like non-spark data and pass them using `X_train, y_train` or `dataframe, label`.\n",
      "\n",
      "### Estimators\n",
      "\n",
      "#### Model List\n",
      "\n",
      "- `lgbm_spark`: The class for fine-tuning Spark version LightGBM models, using [SynapseML](https://microsoft.github.io/SynapseML/docs/features/lightgbm/about/) API.\n",
      "\n",
      "#### Usage\n",
      "\n",
      "First, prepare your data in the required format as described in the previous section.\n",
      "\n",
      "By including the models you intend to try in the `estimators_list` argument to `flaml.automl`, FLAML will start trying configurations for these models. If your input is Spark data, FLAML will also use estimators with the `_spark` postfix by default, even if you haven't specified them.\n",
      "\n",
      "Here is an example code snippet using SparkML models in AutoML:\n",
      "\n",
      "```python\n",
      "import flaml\n",
      "\n",
      "# prepare your data in pandas-on-spark format as we previously mentioned\n",
      "\n",
      "automl = flaml.AutoML()\n",
      "settings = {\n",
      "    \"time_budget\": 30,\n",
      "    \"metric\": \"r2\",\n",
      "    \"estimator_list\": [\"lgbm_spark\"],  # this setting is optional\n",
      "    \"task\": \"regression\",\n",
      "}\n",
      "\n",
      "automl.fit(\n",
      "    dataframe=psdf,\n",
      "    label=label,\n",
      "    **settings,\n",
      ")\n",
      "```\n",
      "\n",
      "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb)\n",
      "\n",
      "## Parallel Spark Jobs\n",
      "\n",
      "You can activate Spark as the parallel backend during parallel tuning in both [AutoML](/docs/Use-Cases/Task-Oriented-AutoML#parallel-tuning) and [Hyperparameter Tuning](/docs/Use-Cases/Tune-User-Defined-Function#parallel-tuning), by setting the `use_spark` to `true`. FLAML will dispatch your job to the distributed Spark backend using [`joblib-spark`](https://github.com/joblib/joblib-spark).\n",
      "\n",
      "Please note that you should not set `use_spark` to `true` when applying AutoML and Tuning for Spark Data. This is because only SparkML models will be used for Spark Data in AutoML and Tuning. As SparkML models run in parallel, there is no need to distribute them with `use_spark` again.\n",
      "\n",
      "All the Spark-related arguments are stated below. These arguments are available in both Hyperparameter Tuning and AutoML:\n",
      "\n",
      "- `use_spark`: boolean, default=False | Whether to use spark to run the training in parallel spark jobs. This can be used to accelerate training on large models and large datasets, but will incur more overhead in time and thus slow down training in some cases. GPU training is not supported yet when use_spark is True. For Spark clusters, by default, we will launch one trial per executor. However, sometimes we want to launch more trials than the number of executors (e.g., local mode). In this case, we can set the environment variable `FLAML_MAX_CONCURRENT` to override the detected `num_executors`. The final number of concurrent trials will be the minimum of `n_concurrent_trials` and `num_executors`.\n",
      "- `n_concurrent_trials`: int, default=1 | The number of concurrent trials. When n_concurrent_trials > 1, FLAML performes parallel tuning.\n",
      "- `force_cancel`: boolean, default=False | Whether to forcely cancel Spark jobs if the search time exceeded the time budget. Spark jobs include parallel tuning jobs and Spark-based model training jobs.\n",
      "\n",
      "An example code snippet for using parallel Spark jobs:\n",
      "\n",
      "```python\n",
      "import flaml\n",
      "\n",
      "automl_experiment = flaml.AutoML()\n",
      "automl_settings = {\n",
      "    \"time_budget\": 30,\n",
      "    \"metric\": \"r2\",\n",
      "    \"task\": \"regression\",\n",
      "    \"n_concurrent_trials\": 2,\n",
      "    \"use_spark\": True,\n",
      "    \"force_cancel\": True,  # Activating the force_cancel option can immediately halt Spark jobs once they exceed the allocated time_budget.\n",
      "}\n",
      "\n",
      "automl.fit(\n",
      "    dataframe=dataframe,\n",
      "    label=label,\n",
      "    **automl_settings,\n",
      ")\n",
      "```\n",
      "\n",
      "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb)\n",
      "\n",
      "\n",
      "\n",
      "--------------------------------------------------------------------------------\n",
      "\u001b[33mProduct_Manager\u001b[0m (to chat_manager):\n",
      "\n",
      "```python\n",
      "from flaml.automl import AutoML\n",
      "from flaml.automl.spark.utils import to_pandas_on_spark\n",
      "from pyspark.ml.feature import VectorAssembler\n",
      "import pandas as pd\n",
      "\n",
      "# Sample data in a dictionary\n",
      "data = {\n",
      "    \"Square_Feet\": [800, 1200, 1800, 1500, 850],\n",
      "    \"Age_Years\": [20, 15, 10, 7, 25],\n",
      "    \"Price\": [100000, 200000, 300000, 240000, 120000],\n",
      "}\n",
      "\n",
      "# Convert dictionary to pandas DataFrame\n",
      "dataframe = pd.DataFrame(data)\n",
      "label = \"Price\"\n",
      "\n",
      "# Convert pandas DataFrame to pandas-on-spark DataFrame\n",
      "psdf = to_pandas_on_spark(dataframe)\n",
      "\n",
      "# Use VectorAssembler to merge feature columns into a single vector column\n",
      "feature_cols = [col for col in psdf.columns if col != label]\n",
      "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n",
      "psdf = featurizer.transform(psdf.to_spark(index_col=\"index\"))[\"index\", \"features\", label]\n",
      "\n",
      "# Initialize AutoML instance\n",
      "automl = AutoML()\n",
      "\n",
      "# AutoML settings\n",
      "automl_settings = {\n",
      "    \"time_budget\": 30,  # Total running time in seconds\n",
      "    \"metric\": \"r2\",     # Evaluation metric\n",
      "    \"task\": \"regression\",\n",
      "    \"n_concurrent_trials\": 2,   # Number of concurrent Spark jobs\n",
      "    \"use_spark\": True,          # Enable Spark for parallel training\n",
      "    \"force_cancel\": True,       # Force cancel Spark jobs if they exceed the time budget\n",
      "    \"estimator_list\": [\"lgbm_spark\"]  # Optional: Specific estimator to use\n",
      "}\n",
      "\n",
      "# Run AutoML fit with pandas-on-spark dataframe\n",
      "automl.fit(\n",
      "    dataframe=psdf,\n",
      "    label=label,\n",
      "    **automl_settings,\n",
      ")\n",
      "```\n",
      "TERMINATE\n",
      "\n",
      "--------------------------------------------------------------------------------\n"
     ]
    }
   ],
   "source": [
    "rag_chat()\n",
    "# type exit to terminate the chat"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Call RetrieveUserProxyAgent while init chat with another user proxy agent\n",
    "Sometimes, there might be a need to use RetrieveUserProxyAgent in group chat without initializing the chat with it. In such scenarios, it becomes essential to create a function that wraps the RAG agents and allows them to be called from other agents."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[33mBoss\u001b[0m (to chat_manager):\n",
      "\n",
      "How to use spark for parallel training in FLAML? Give me sample code.\n",
      "\n",
      "--------------------------------------------------------------------------------\n",
      "\u001b[33mProduct_Manager\u001b[0m (to chat_manager):\n",
      "\n",
      "\u001b[32m***** Suggested function call: retrieve_content *****\u001b[0m\n",
      "Arguments: \n",
      "{\"message\":\"using Apache Spark for parallel training in FLAML with sample code\"}\n",
      "\u001b[32m*****************************************************\u001b[0m\n",
      "\n",
      "--------------------------------------------------------------------------------\n",
      "\u001b[35m\n",
      ">>>>>>>> EXECUTING FUNCTION retrieve_content...\u001b[0m\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Number of requested results 3 is greater than number of elements in index 1, updating n_results = 1\n",
      "Model gpt4-1106-preview not found. Using cl100k_base encoding.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "VectorDB returns doc_ids:  [['bdfbc921']]\n",
      "\u001b[32mAdding content of doc bdfbc921 to context.\u001b[0m\n",
      "\u001b[33mBoss\u001b[0m (to chat_manager):\n",
      "\n",
      "\u001b[32m***** Response from calling function (retrieve_content) *****\u001b[0m\n",
      "You're a retrieve augmented coding assistant. You answer user's questions based on your own knowledge and the\n",
      "context provided by the user.\n",
      "If you can't answer the question with or without the current context, you should reply exactly `UPDATE CONTEXT`.\n",
      "For code generation, you must obey the following rules:\n",
      "Rule 1. You MUST NOT install any packages because all the packages needed are already installed.\n",
      "Rule 2. You must follow the formats below to write your code:\n",
      "```language\n",
      "# your code\n",
      "```\n",
      "\n",
      "User's question is: using Apache Spark for parallel training in FLAML with sample code\n",
      "\n",
      "Context is: # Integrate - Spark\n",
      "\n",
      "FLAML has integrated Spark for distributed training. There are two main aspects of integration with Spark:\n",
      "\n",
      "- Use Spark ML estimators for AutoML.\n",
      "- Use Spark to run training in parallel spark jobs.\n",
      "\n",
      "## Spark ML Estimators\n",
      "\n",
      "FLAML integrates estimators based on Spark ML models. These models are trained in parallel using Spark, so we called them Spark estimators. To use these models, you first need to organize your data in the required format.\n",
      "\n",
      "### Data\n",
      "\n",
      "For Spark estimators, AutoML only consumes Spark data. FLAML provides a convenient function `to_pandas_on_spark` in the `flaml.automl.spark.utils` module to convert your data into a pandas-on-spark (`pyspark.pandas`) dataframe/series, which Spark estimators require.\n",
      "\n",
      "This utility function takes data in the form of a `pandas.Dataframe` or `pyspark.sql.Dataframe` and converts it into a pandas-on-spark dataframe. It also takes `pandas.Series` or `pyspark.sql.Dataframe` and converts it into a [pandas-on-spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) series. If you pass in a `pyspark.pandas.Dataframe`, it will not make any changes.\n",
      "\n",
      "This function also accepts optional arguments `index_col` and `default_index_type`.\n",
      "\n",
      "- `index_col` is the column name to use as the index, default is None.\n",
      "- `default_index_type` is the default index type, default is \"distributed-sequence\". More info about default index type could be found on Spark official [documentation](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type)\n",
      "\n",
      "Here is an example code snippet for Spark Data:\n",
      "\n",
      "```python\n",
      "import pandas as pd\n",
      "from flaml.automl.spark.utils import to_pandas_on_spark\n",
      "\n",
      "# Creating a dictionary\n",
      "data = {\n",
      "    \"Square_Feet\": [800, 1200, 1800, 1500, 850],\n",
      "    \"Age_Years\": [20, 15, 10, 7, 25],\n",
      "    \"Price\": [100000, 200000, 300000, 240000, 120000],\n",
      "}\n",
      "\n",
      "# Creating a pandas DataFrame\n",
      "dataframe = pd.DataFrame(data)\n",
      "label = \"Price\"\n",
      "\n",
      "# Convert to pandas-on-spark dataframe\n",
      "psdf = to_pandas_on_spark(dataframe)\n",
      "```\n",
      "\n",
      "To use Spark ML models you need to format your data appropriately. Specifically, use [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) to merge all feature columns into a single vector column.\n",
      "\n",
      "Here is an example of how to use it:\n",
      "\n",
      "```python\n",
      "from pyspark.ml.feature import VectorAssembler\n",
      "\n",
      "columns = psdf.columns\n",
      "feature_cols = [col for col in columns if col != label]\n",
      "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n",
      "psdf = featurizer.transform(psdf.to_spark(index_col=\"index\"))[\"index\", \"features\"]\n",
      "```\n",
      "\n",
      "Later in conducting the experiment, use your pandas-on-spark data like non-spark data and pass them using `X_train, y_train` or `dataframe, label`.\n",
      "\n",
      "### Estimators\n",
      "\n",
      "#### Model List\n",
      "\n",
      "- `lgbm_spark`: The class for fine-tuning Spark version LightGBM models, using [SynapseML](https://microsoft.github.io/SynapseML/docs/features/lightgbm/about/) API.\n",
      "\n",
      "#### Usage\n",
      "\n",
      "First, prepare your data in the required format as described in the previous section.\n",
      "\n",
      "By including the models you intend to try in the `estimators_list` argument to `flaml.automl`, FLAML will start trying configurations for these models. If your input is Spark data, FLAML will also use estimators with the `_spark` postfix by default, even if you haven't specified them.\n",
      "\n",
      "Here is an example code snippet using SparkML models in AutoML:\n",
      "\n",
      "```python\n",
      "import flaml\n",
      "\n",
      "# prepare your data in pandas-on-spark format as we previously mentioned\n",
      "\n",
      "automl = flaml.AutoML()\n",
      "settings = {\n",
      "    \"time_budget\": 30,\n",
      "    \"metric\": \"r2\",\n",
      "    \"estimator_list\": [\"lgbm_spark\"],  # this setting is optional\n",
      "    \"task\": \"regression\",\n",
      "}\n",
      "\n",
      "automl.fit(\n",
      "    dataframe=psdf,\n",
      "    label=label,\n",
      "    **settings,\n",
      ")\n",
      "```\n",
      "\n",
      "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb)\n",
      "\n",
      "## Parallel Spark Jobs\n",
      "\n",
      "You can activate Spark as the parallel backend during parallel tuning in both [AutoML](/docs/Use-Cases/Task-Oriented-AutoML#parallel-tuning) and [Hyperparameter Tuning](/docs/Use-Cases/Tune-User-Defined-Function#parallel-tuning), by setting the `use_spark` to `true`. FLAML will dispatch your job to the distributed Spark backend using [`joblib-spark`](https://github.com/joblib/joblib-spark).\n",
      "\n",
      "Please note that you should not set `use_spark` to `true` when applying AutoML and Tuning for Spark Data. This is because only SparkML models will be used for Spark Data in AutoML and Tuning. As SparkML models run in parallel, there is no need to distribute them with `use_spark` again.\n",
      "\n",
      "All the Spark-related arguments are stated below. These arguments are available in both Hyperparameter Tuning and AutoML:\n",
      "\n",
      "- `use_spark`: boolean, default=False | Whether to use spark to run the training in parallel spark jobs. This can be used to accelerate training on large models and large datasets, but will incur more overhead in time and thus slow down training in some cases. GPU training is not supported yet when use_spark is True. For Spark clusters, by default, we will launch one trial per executor. However, sometimes we want to launch more trials than the number of executors (e.g., local mode). In this case, we can set the environment variable `FLAML_MAX_CONCURRENT` to override the detected `num_executors`. The final number of concurrent trials will be the minimum of `n_concurrent_trials` and `num_executors`.\n",
      "- `n_concurrent_trials`: int, default=1 | The number of concurrent trials. When n_concurrent_trials > 1, FLAML performes parallel tuning.\n",
      "- `force_cancel`: boolean, default=False | Whether to forcely cancel Spark jobs if the search time exceeded the time budget. Spark jobs include parallel tuning jobs and Spark-based model training jobs.\n",
      "\n",
      "An example code snippet for using parallel Spark jobs:\n",
      "\n",
      "```python\n",
      "import flaml\n",
      "\n",
      "automl_experiment = flaml.AutoML()\n",
      "automl_settings = {\n",
      "    \"time_budget\": 30,\n",
      "    \"metric\": \"r2\",\n",
      "    \"task\": \"regression\",\n",
      "    \"n_concurrent_trials\": 2,\n",
      "    \"use_spark\": True,\n",
      "    \"force_cancel\": True,  # Activating the force_cancel option can immediately halt Spark jobs once they exceed the allocated time_budget.\n",
      "}\n",
      "\n",
      "automl.fit(\n",
      "    dataframe=dataframe,\n",
      "    label=label,\n",
      "    **automl_settings,\n",
      ")\n",
      "```\n",
      "\n",
      "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb)\n",
      "\n",
      "\n",
      "\u001b[32m*************************************************************\u001b[0m\n",
      "\n",
      "--------------------------------------------------------------------------------\n",
      "\u001b[33mBoss\u001b[0m (to chat_manager):\n",
      "\n",
      "\u001b[32m***** Response from calling function (retrieve_content) *****\u001b[0m\n",
      "You're a retrieve augmented coding assistant. You answer user's questions based on your own knowledge and the\n",
      "context provided by the user.\n",
      "If you can't answer the question with or without the current context, you should reply exactly `UPDATE CONTEXT`.\n",
      "For code generation, you must obey the following rules:\n",
      "Rule 1. You MUST NOT install any packages because all the packages needed are already installed.\n",
      "Rule 2. You must follow the formats below to write your code:\n",
      "```language\n",
      "# your code\n",
      "```\n",
      "\n",
      "User's question is: using Apache Spark for parallel training in FLAML with sample code\n",
      "\n",
      "Context is: # Integrate - Spark\n",
      "\n",
      "FLAML has integrated Spark for distributed training. There are two main aspects of integration with Spark:\n",
      "\n",
      "- Use Spark ML estimators for AutoML.\n",
      "- Use Spark to run training in parallel spark jobs.\n",
      "\n",
      "## Spark ML Estimators\n",
      "\n",
      "FLAML integrates estimators based on Spark ML models. These models are trained in parallel using Spark, so we called them Spark estimators. To use these models, you first need to organize your data in the required format.\n",
      "\n",
      "### Data\n",
      "\n",
      "For Spark estimators, AutoML only consumes Spark data. FLAML provides a convenient function `to_pandas_on_spark` in the `flaml.automl.spark.utils` module to convert your data into a pandas-on-spark (`pyspark.pandas`) dataframe/series, which Spark estimators require.\n",
      "\n",
      "This utility function takes data in the form of a `pandas.Dataframe` or `pyspark.sql.Dataframe` and converts it into a pandas-on-spark dataframe. It also takes `pandas.Series` or `pyspark.sql.Dataframe` and converts it into a [pandas-on-spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) series. If you pass in a `pyspark.pandas.Dataframe`, it will not make any changes.\n",
      "\n",
      "This function also accepts optional arguments `index_col` and `default_index_type`.\n",
      "\n",
      "- `index_col` is the column name to use as the index, default is None.\n",
      "- `default_index_type` is the default index type, default is \"distributed-sequence\". More info about default index type could be found on Spark official [documentation](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type)\n",
      "\n",
      "Here is an example code snippet for Spark Data:\n",
      "\n",
      "```python\n",
      "import pandas as pd\n",
      "from flaml.automl.spark.utils import to_pandas_on_spark\n",
      "\n",
      "# Creating a dictionary\n",
      "data = {\n",
      "    \"Square_Feet\": [800, 1200, 1800, 1500, 850],\n",
      "    \"Age_Years\": [20, 15, 10, 7, 25],\n",
      "    \"Price\": [100000, 200000, 300000, 240000, 120000],\n",
      "}\n",
      "\n",
      "# Creating a pandas DataFrame\n",
      "dataframe = pd.DataFrame(data)\n",
      "label = \"Price\"\n",
      "\n",
      "# Convert to pandas-on-spark dataframe\n",
      "psdf = to_pandas_on_spark(dataframe)\n",
      "```\n",
      "\n",
      "To use Spark ML models you need to format your data appropriately. Specifically, use [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) to merge all feature columns into a single vector column.\n",
      "\n",
      "Here is an example of how to use it:\n",
      "\n",
      "```python\n",
      "from pyspark.ml.feature import VectorAssembler\n",
      "\n",
      "columns = psdf.columns\n",
      "feature_cols = [col for col in columns if col != label]\n",
      "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n",
      "psdf = featurizer.transform(psdf.to_spark(index_col=\"index\"))[\"index\", \"features\"]\n",
      "```\n",
      "\n",
      "Later in conducting the experiment, use your pandas-on-spark data like non-spark data and pass them using `X_train, y_train` or `dataframe, label`.\n",
      "\n",
      "### Estimators\n",
      "\n",
      "#### Model List\n",
      "\n",
      "- `lgbm_spark`: The class for fine-tuning Spark version LightGBM models, using [SynapseML](https://microsoft.github.io/SynapseML/docs/features/lightgbm/about/) API.\n",
      "\n",
      "#### Usage\n",
      "\n",
      "First, prepare your data in the required format as described in the previous section.\n",
      "\n",
      "By including the models you intend to try in the `estimators_list` argument to `flaml.automl`, FLAML will start trying configurations for these models. If your input is Spark data, FLAML will also use estimators with the `_spark` postfix by default, even if you haven't specified them.\n",
      "\n",
      "Here is an example code snippet using SparkML models in AutoML:\n",
      "\n",
      "```python\n",
      "import flaml\n",
      "\n",
      "# prepare your data in pandas-on-spark format as we previously mentioned\n",
      "\n",
      "automl = flaml.AutoML()\n",
      "settings = {\n",
      "    \"time_budget\": 30,\n",
      "    \"metric\": \"r2\",\n",
      "    \"estimator_list\": [\"lgbm_spark\"],  # this setting is optional\n",
      "    \"task\": \"regression\",\n",
      "}\n",
      "\n",
      "automl.fit(\n",
      "    dataframe=psdf,\n",
      "    label=label,\n",
      "    **settings,\n",
      ")\n",
      "```\n",
      "\n",
      "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb)\n",
      "\n",
      "## Parallel Spark Jobs\n",
      "\n",
      "You can activate Spark as the parallel backend during parallel tuning in both [AutoML](/docs/Use-Cases/Task-Oriented-AutoML#parallel-tuning) and [Hyperparameter Tuning](/docs/Use-Cases/Tune-User-Defined-Function#parallel-tuning), by setting the `use_spark` to `true`. FLAML will dispatch your job to the distributed Spark backend using [`joblib-spark`](https://github.com/joblib/joblib-spark).\n",
      "\n",
      "Please note that you should not set `use_spark` to `true` when applying AutoML and Tuning for Spark Data. This is because only SparkML models will be used for Spark Data in AutoML and Tuning. As SparkML models run in parallel, there is no need to distribute them with `use_spark` again.\n",
      "\n",
      "All the Spark-related arguments are stated below. These arguments are available in both Hyperparameter Tuning and AutoML:\n",
      "\n",
      "- `use_spark`: boolean, default=False | Whether to use spark to run the training in parallel spark jobs. This can be used to accelerate training on large models and large datasets, but will incur more overhead in time and thus slow down training in some cases. GPU training is not supported yet when use_spark is True. For Spark clusters, by default, we will launch one trial per executor. However, sometimes we want to launch more trials than the number of executors (e.g., local mode). In this case, we can set the environment variable `FLAML_MAX_CONCURRENT` to override the detected `num_executors`. The final number of concurrent trials will be the minimum of `n_concurrent_trials` and `num_executors`.\n",
      "- `n_concurrent_trials`: int, default=1 | The number of concurrent trials. When n_concurrent_trials > 1, FLAML performes parallel tuning.\n",
      "- `force_cancel`: boolean, default=False | Whether to forcely cancel Spark jobs if the search time exceeded the time budget. Spark jobs include parallel tuning jobs and Spark-based model training jobs.\n",
      "\n",
      "An example code snippet for using parallel Spark jobs:\n",
      "\n",
      "```python\n",
      "import flaml\n",
      "\n",
      "automl_experiment = flaml.AutoML()\n",
      "automl_settings = {\n",
      "    \"time_budget\": 30,\n",
      "    \"metric\": \"r2\",\n",
      "    \"task\": \"regression\",\n",
      "    \"n_concurrent_trials\": 2,\n",
      "    \"use_spark\": True,\n",
      "    \"force_cancel\": True,  # Activating the force_cancel option can immediately halt Spark jobs once they exceed the allocated time_budget.\n",
      "}\n",
      "\n",
      "automl.fit(\n",
      "    dataframe=dataframe,\n",
      "    label=label,\n",
      "    **automl_settings,\n",
      ")\n",
      "```\n",
      "\n",
      "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb)\n",
      "\n",
      "\n",
      "\u001b[32m*************************************************************\u001b[0m\n",
      "\n",
      "--------------------------------------------------------------------------------\n",
      "\u001b[33mProduct_Manager\u001b[0m (to chat_manager):\n",
      "\n",
      "To use Apache Spark for parallel training in FLAML, you can follow these steps:\n",
      "\n",
      "1. Ensure your data is in the required pandas-on-spark format.\n",
      "2. Use Spark ML estimators by including them in the `estimator_list`.\n",
      "3. Set `use_spark` to `True` for parallel tuning.\n",
      "\n",
      "Here's a sample code demonstrating how to use Spark for parallel training in FLAML:\n",
      "\n",
      "```python\n",
      "import flaml\n",
      "from flaml.automl.spark.utils import to_pandas_on_spark\n",
      "import pandas as pd\n",
      "from pyspark.ml.feature import VectorAssembler\n",
      "\n",
      "# Sample data in a pandas DataFrame\n",
      "data = {\n",
      "    \"Square_Feet\": [800, 1200, 1800, 1500, 850],\n",
      "    \"Age_Years\": [20, 15, 10, 7, 25],\n",
      "    \"Price\": [100000, 200000, 300000, 240000, 120000],\n",
      "}\n",
      "label = \"Price\"\n",
      "\n",
      "# Creating a pandas DataFrame\n",
      "dataframe = pd.DataFrame(data)\n",
      "\n",
      "# Convert to pandas-on-spark dataframe\n",
      "psdf = to_pandas_on_spark(dataframe)\n",
      "\n",
      "# Prepare features using VectorAssembler\n",
      "columns = psdf.columns\n",
      "feature_cols = [col for col in columns if col != label]\n",
      "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n",
      "psdf = featurizer.transform(psdf.to_spark(index_col=\"index\"))[\"index\", \"features\"]\n",
      "\n",
      "# Initialize AutoML\n",
      "automl = flaml.AutoML()\n",
      "\n",
      "# Configure settings for AutoML\n",
      "settings = {\n",
      "    \"time_budget\": 30,  # time budget in seconds\n",
      "    \"metric\": \"r2\",\n",
      "    \"estimator_list\": [\"lgbm_spark\"],  # using Spark ML estimators\n",
      "    \"task\": \"regression\",\n",
      "    \"n_concurrent_trials\": 2,  # number of parallel trials\n",
      "    \"use_spark\": True,  # enable parallel training using Spark\n",
      "    \"force_cancel\": True,  # force cancel Spark jobs if time_budget is exceeded\n",
      "}\n",
      "\n",
      "# Start the training\n",
      "automl.fit(dataframe=psdf, label=label, **settings)\n",
      "```\n",
      "\n",
      "In this code snippet:\n",
      "- The `to_pandas_on_spark` function is used to convert the pandas DataFrame to a pandas-on-spark DataFrame.\n",
      "- `VectorAssembler` is used to transform feature columns into a single vector column.\n",
      "- The `AutoML` object is created, and settings are configured for the AutoML run, including setting `use_spark` to `True` for parallel training.\n",
      "- The `fit` method is called to start the automated machine learning process.\n",
      "\n",
      "By using these settings, FLAML will train the models in parallel using Spark, which can accelerate the training process on large models and datasets.\n",
      "\n",
      "TERMINATE\n",
      "\n",
      "--------------------------------------------------------------------------------\n"
     ]
    }
   ],
   "source": [
    "call_rag_chat()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "front_matter": {
   "description": "Implement and manage a multi-agent chat system using AutoGen, where AI assistants retrieve information, generate code, and interact collaboratively to solve complex tasks, especially in areas not covered by their training data.",
   "tags": [
    "group chat",
    "orchestration",
    "RAG"
   ]
  },
  "kernelspec": {
   "display_name": "flaml",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
